﻿using CRL.Core;
using CRL.Core.Extension;
using CRL;
using CRL.Attribute;
using CRL.DBAccess;
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace CRL.EventBus.Queue
{
    public class DbQueueConfig : ConfigBase
    {
        /// <summary>
        /// 按每个名称一个线程
        /// </summary>
        public bool ConsumerByNameSingle { get; set; }
        public DBType DBType { get; set; }
    }
    class DbQueue : AbsQueue
    {
        DbQueueConfig _queueConfig;
        List<Core.ThreadWork> threads = new List<ThreadWork>();
        public override MQType MQType =>  MQType.Db;
        public DbQueue(ConfigBase queueConfig)
        {
            _queueConfig = queueConfig as DbQueueConfig;
            CheckQueueTable();
        }
        internal void CheckQueueTable()
        {
            var manage = new EventBusDataQueueManage();
            manage.CreateTable();
            //manage.CreateTableIndex();
            var nos = new List<string>() { "11" };
            manage.QueryItem(b => b.Id > 0 && nos.Contains(b.RoutingKey));
        }
        public override void Dispose()
        {
            foreach (var t in threads)
            {
                t.Stop();
            }
        }

        public override void PublishList(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            var queueName = $"CRL_QUEUE_{routingKey}";
            var manage = new EventBusDataQueueManageWriter();
            var time = opt.DelaySecond > 0 ? DateTime.Now.AddSeconds(opt.DelaySecond) : DateTime.Now.AddMilliseconds(opt.DelayMs);
            var list = msgs.Select(b => new _EventBusDataQueueWriter
            {
                Data = b.ToJson(),
                GroupName = _queueConfig.GroupName,
                Priority = opt.Priority,
                RoutingKey = routingKey,
                Time = time,
                DelayMs = opt.DelaySecond > 0 ? opt.DelaySecond * 1000 : opt.DelayMs,
                MsgKey = opt.MsgKey
            });
            if (list.Count() == 1)
            {
                manage.Add(list.FirstOrDefault());
                return;
            }
            manage.BatchInsert(list.ToList());
        }
        public override async Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            var queueName = $"CRL_QUEUE_{routingKey}";
            var manage = new EventBusDataQueueManageWriter();
            var time = opt.DelaySecond > 0 ? DateTime.Now.AddSeconds(opt.DelaySecond) : DateTime.Now.AddMilliseconds(opt.DelayMs);
            var list = msgs.Select(b => new _EventBusDataQueueWriter
            {
                Data = b.ToJson(),
                GroupName = _queueConfig.GroupName,
                Priority = opt.Priority,
                RoutingKey = routingKey,
                Time = time,
                DelayMs = opt.DelaySecond > 0 ? opt.DelaySecond * 1000 : opt.DelayMs,
                MsgKey = opt.MsgKey
            });
            if (list.Count() == 1)
            {
                await manage.AddAsync(list.FirstOrDefault());
                return;
            }
            await manage.BatchInsertAsync(list.ToList());
        }
        public override void RePublish(IData data, object msg)
        {
            var manage = new EventBusDataQueueManage();
            var data2 = data as _EventBusDataQueue;
            manage.Update(b => b.Id == data2.Id, new { RetryTimes = data2.RetryTimes + 1 });
        }
        public override void ConfirmConsume()
        {
            //按相同轮循时间或单独名称分组
            var group = eventDeclares.GroupBy(b => b.SubscribeAttribute.ThreadSleepSecond.ToString());
            if (_queueConfig.ConsumerByNameSingle)
            {
                group = eventDeclares.GroupBy(b => b.Name);
            }
            foreach (var g in group)
            {
                var eds = g.ToArray();
                //var attr = ed.SubscribeAttribute;
                var thread = new Core.ThreadWork() { Args = eds };
                //随机sleep
                System.Threading.Thread.Sleep(10);
                var second = eds.First().SubscribeAttribute.ThreadSleepSecond;
                thread.Start("eventBusDBSub", SubscribeData, second, true);
                threads.Add(thread);
            }
        }
        List<EventDeclare> eventDeclares = new List<EventDeclare>();
        public override void SubscribeAsync(EventDeclare eventDeclare)
        {
            if (eventDeclare.SubscribeAttribute.ThreadSleepSecond < 3)
            {
                eventDeclare.SubscribeAttribute.ThreadSleepSecond = 3;
            }
            eventDeclares.Add(eventDeclare);
        }
        bool SubscribeData(object args)
        {
            SubscribeDataSync(args).Wait();
            return true;
        }
        async Task<bool> SubscribeDataSync(object args, string msgKey = "")
        {
            var eds = args as EventDeclare[];
            var names = eds.Select(b => b.Name);
            var manage = new EventBusDataQueueManage();
        label1:
            var query = manage.GetLambdaQuery().Where(b => names.Contains(b.RoutingKey) && b.Time < DateTime.Now).OrderBy(b => b.Priority, true).CompileToSp(false);
            if (!string.IsNullOrEmpty(_queueConfig.GroupName))
            {
                query.Where(b => b.GroupName == _queueConfig.GroupName);
            }
            bool throwEx = false;
            if (!string.IsNullOrEmpty(msgKey))
            {
                throwEx = true;
                query.Where(b => b.MsgKey == msgKey);
            }
            var take = eds.Max(b => b.SubscribeAttribute.BatchSize);
            var allData = await query.OrderBy(b => b.Time, false).Take(take).ToListAsync();
            if(!allData.Any())
            {
                return true;
            }
            //query.PrintQuery();
            //Console.WriteLine($"total {list.Count}");
            var group = allData.GroupBy(b => b.RoutingKey);
            foreach (var g in group)
            {
                var ed = eds.Find(b => b.Name == g.Key);
                var attr = ed.SubscribeAttribute;
                var instance = ed.CreateServiceInstance();
                if (ed.IsArray)
                {
                    var objInstance = DynamicMethodHelper.CreateCtorFuncFromCache(ed.EventDataType)();
                    var innerType = ed.EventDataType.GenericTypeArguments[0];
                    var method = ed.EventDataType.GetMethod("Add");
                    foreach (var m in g)
                    {
                        object item;
                        try
                        {
                            item = m.Data.ToObject(innerType);
                        }
                        catch (Exception ex)
                        {
                            throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {innerType}=>{m.Data}");
                        }
                        method.Invoke(objInstance, new object[] { item });
                    }
                    object result = true;
                    try
                    {
                        //await (Task)ed.MethodInvoke.Invoke(ed.CreateServiceInstance(), new object[] { objInstance });
                        result = await getInvokeResult(ed, instance, objInstance);
                    }
                    catch (Exception ero)
                    {
                        if (throwEx)
                        {
                            throw ero;
                        }
                        result = false;
                        Log(ed, ero.ToString());
                    }
                    var invokeSuccess = checkInvokeSuccess(result);
                    if (invokeSuccess.invokeSuccess)
                    {
                        //成功则删除
                        var ids = g.Select(b => b.Id).ToArray();
                        manage.Delete(b => ids.Contains(b.Id));
                    }
                    foreach (var m in g)
                    {
                        var item = m.Data.ToObject(innerType);
                        CheckResult(invokeSuccess, m, ed, item);
                    }
                }
                else
                {
                    foreach (var m in g)
                    {
                        //Core.EventLog.Log(m.Data, $"{ed.EventDataType.Name}_event");
                        //var item = m.Data.ToObject(ed.EventDataType);
                        object item;
                        try
                        {
                            item = m.Data.ToObject(ed.EventDataType);
                        }
                        catch (Exception ex)
                        {
                            throw new Exception($"发布和订阅类型冲突 {ex.Message} subName:{ed.Name} {ed.EventDataType}=>{m.Data}");
                        }
                        object result = true;
                        try
                        {
                            //await (Task)ed.MethodInvoke.Invoke(ed.CreateServiceInstance(), new object[] { item });
                            result = await getInvokeResult(ed, instance, item);
                        }
                        catch (Exception ero)
                        {
                            if (throwEx)
                            {
                                throw ero;
                            }
                            result = false;
                            Log(ed, ero.ToString());
                        }
                        var invokeSuccess = checkInvokeSuccess(result);
                        if (invokeSuccess.invokeSuccess)
                        {
                            manage.Delete(b => b.Id == m.Id);
                        }
                        CheckResult(invokeSuccess, m, ed, item);
                    }
                }
            }
            if (take == allData.Count)
            {
                goto label1;
            }
            return true;
        }
        public override void Subscribe(EventDeclare eventDeclare)
        {
            SubscribeAsync(eventDeclare);
        }
        public override void DeleteData(IData data)
        {
            var data2 = data as _EventBusDataQueue;
            var manage = new EventBusDataQueueManage();
            manage.Delete(b => b.Id == data2.Id);
        }
        public override bool DeleteMsg(string msgKey)
        {
            var manage = new EventBusDataQueueManage();
            return manage.Delete(b => b.MsgKey == msgKey) > 0;
        }
        public override long CleanQueue(string name)
        {
            var manage = new EventBusDataQueueManage();
            return manage.Delete(b => b.RoutingKey == name);
        }
        public override long GetQueueLength(string name)
        {
            var manage = new EventBusDataQueueManage();
            return manage.Count(b => b.RoutingKey == name);
        }
        public override bool ConsumeMsg(string msgKey)
        {
            var manage = new EventBusDataQueueManage();
            var data = manage.QueryItem(b => b.MsgKey == msgKey);
            if (data == null)
            {
                throw new Exception($"msgKey未找到 {msgKey}");
            }
            var ed = SubscribeService.GetEventDeclare(data.RoutingKey);
            if (ed == null)
            {
                throw new Exception($"EventDeclare未找到 {data.RoutingKey}");
            }
            return SubscribeDataSync(new EventDeclare[] { ed }, msgKey).Result;
        }
        public override void ConsumeMsg<T>(string routingKey, int take, Action<List<T>> action, string msgKey = "")
        {
            var manage = new EventBusDataQueueManage();
        label1:
            var query = manage.GetLambdaQuery().Where(b => b.RoutingKey == routingKey && b.Time < DateTime.Now).OrderBy(b => b.Priority, true).CompileToSp(false);
            if (!string.IsNullOrEmpty(_queueConfig.GroupName))
            {
                query.Where(b => b.GroupName == _queueConfig.GroupName);
            }
            if (!string.IsNullOrEmpty(msgKey))
            {
                query.Where(b => b.MsgKey == msgKey);
            }
            var allData = query.OrderBy(b => b.Time, false).Take(take).ToList();
            if(!allData.Any())
            {
                return;
            }
            var datas = allData.Select(b => b.Data.ToObject<T>()).ToList();
            action(datas);
            var ids = allData.Select(b => b.Id);
            manage.Delete(b => ids.Contains(b.Id));
            if (ids.Count() == take)
            {
                goto label1;
            }
        }
    }
    class DataBase : IData
    {
        [Field(Length = 5000)]
        public string Data
        {
            get; set;
        }
        [Field(FieldIndexType = FieldIndexType.非聚集)]
        public DateTime Time
        {
            get; set;
        } = DateTime.Now;
        public DateTime CreateTime
        {
            get; set;
        } = DateTime.Now;
        public int Priority { get; set; }
        public int RetryTimes { get; set; }
        [Field(FieldIndexType = FieldIndexType.非聚集, Length = 100)]
        public string RoutingKey { get; set; }
        [Field(FieldIndexType = FieldIndexType.非聚集, Length = 100)]
        public string GroupName { get; set; }
        public int DelayMs { get; set; }
        [Field(FieldIndexType = FieldIndexType.非聚集, Length = 100)]
        public string MsgKey { get; set; }
    }
    [Table(TableName = "_EventBusDataQueue")]
    class _EventBusDataQueue : DataBase
    {
        //[Field(FieldIndexType = FieldIndexType.非聚集, Length =100)]
        //public string QueueName { get; set; }
        public long Id { get; set; }
        //public new int Priority { get; set; }
    }
    [Table(TableName = "_EventBusDataQueue")]
    class _EventBusDataQueueWriter : DataBase
    {
        //[Field(FieldIndexType = FieldIndexType.非聚集, Length = 100)]
        //public string QueueName { get; set; }
        //public new int Priority { get; set; }
    }
    class EventBusDataQueueManage : BaseProvider<_EventBusDataQueue>
    {
        public override string ManageName => "_eventBusQueue";
    }
    class EventBusDataQueueManageWriter : BaseProvider<_EventBusDataQueueWriter>
    {
        public override string ManageName => "_eventBusQueue";
    }
    //    class DbPublisher : AbsPublisher
    //    {
    //#if NETSTANDARD 
    //        public DbPublisher(Microsoft.Extensions.Options.IOptions<QueueConfig> options) : base(options.Value)
    //        {
    //            queue = QueueFactory.CreateClient(options.Value.GetMqSetting(MQType.Db), false);
    //        }
    //#endif
    //        public DbPublisher(QueueConfig _queueConfig) : base(_queueConfig)
    //        {
    //            queue = QueueFactory.CreateClient(_queueConfig.GetMqSetting(MQType.Db), false);
    //        }
    //    }
}
